home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
AmigActive 23
/
AACD 23.iso
/
AACD
/
Online
/
opennap
/
buffer.c
< prev
next >
Wrap
C/C++ Source or Header
|
2001-06-08
|
12KB
|
492 lines
/* Copyright (C) 2000-1 drscholl@users.sourceforge.net
This is free software distributed under the terms of the
GNU Public License. See the file COPYING for details.
$Id: buffer.c,v 1.65 2001/02/15 08:39:45 drscholl Exp $ */
#ifndef WIN32
#include <unistd.h>
#include <sys/time.h>
#else
#include <windows.h>
#endif /* !WIN32 */
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include "opennap.h"
#include "debug.h"
static BUFFER *
buffer_new (void)
{
BUFFER *r = CALLOC (1, sizeof (BUFFER));
if (!r)
{
OUTOFMEMORY ("buffer_new");
return 0;
}
#if DEBUG
r->magic = MAGIC_BUFFER;
#endif
r->data = MALLOC (BUFFER_SIZE);
if (!r->data)
{
OUTOFMEMORY ("buffer_new");
FREE (r);
return 0;
}
r->datamax = BUFFER_SIZE;
return r;
}
/* append bytes to the buffer */
static BUFFER *
buffer_queue (BUFFER * b, char *d, int dsize)
{
BUFFER *r = b;
int count;
if (b)
while (b->next)
b = b->next;
while (dsize > 0)
{
if (!b)
r = b = buffer_new ();
else if (b->datasize == b->datamax)
{
b->next = buffer_new ();
b = b->next;
}
if (!b)
{
/*something really bad just happened! no choice but to close
this connection since it will be out of sync */
buffer_free (r);
return 0;
}
count = dsize;
/* dsize could be greater than what is allocated */
if (count > b->datamax - b->datasize)
count = b->datamax - b->datasize;
memcpy (b->data + b->datasize, d, count);
b->datasize += count;
dsize -= count;
d += count;
}
return r;
}
/* consume some bytes from the buffer */
BUFFER *
buffer_consume (BUFFER * b, int n)
{
ASSERT (buffer_validate (b));
ASSERT (b->consumed + n <= b->datasize);
b->consumed += n;
if (b->consumed >= b->datasize)
{
BUFFER *p = b;
b = b->next;
FREE (p->data);
FREE (p);
}
return b;
}
BUFFER *
buffer_append (BUFFER * a, BUFFER * b)
{
BUFFER *r = a;
ASSERT (b != 0);
if (!a)
return b;
ASSERT (buffer_validate (a));
while (a->next)
a = a->next;
a->next = b;
return r;
}
int
buffer_size (BUFFER * b)
{
int n = 0;
ASSERT (b == 0 || buffer_validate (b));
for (; b; b = b->next)
n += b->datasize - b->consumed;
return n;
}
void
buffer_free (BUFFER * b)
{
BUFFER *p;
ASSERT (b == 0 || buffer_validate (b));
while (b)
{
p = b;
b = b->next;
FREE (p->data);
FREE (p);
}
}
#if DEBUG
int
buffer_validate (BUFFER * b)
{
ASSERT_RETURN_IF_FAIL (VALID_LEN (b, sizeof (BUFFER)), 0);
ASSERT_RETURN_IF_FAIL (b->magic == MAGIC_BUFFER, 0);
ASSERT_RETURN_IF_FAIL (b->datasize <= b->datamax, 0);
ASSERT_RETURN_IF_FAIL (b->data == 0
|| VALID_LEN (b->data, b->datasize), 0);
ASSERT_RETURN_IF_FAIL (b->consumed == 0 || b->consumed < b->datasize, 0);
ASSERT_RETURN_IF_FAIL (b->next == 0
|| VALID_LEN (b->next, sizeof (BUFFER *)), 0);
return 1;
}
#endif /* DEBUG */
static BUFFER *
buffer_compress (z_streamp zip, BUFFER ** b)
{
BUFFER *r = 0, **pr;
int n, bytes, flush;
ASSERT (buffer_validate (*b));
/* set up the input */
bytes = (*b)->datasize - (*b)->consumed;
zip->next_in = (u_char *) (*b)->data + (*b)->consumed;
zip->avail_in = bytes;
/* force a flush if this is the last input to compress */
flush = ((*b)->next == 0) ? Z_SYNC_FLUSH : Z_NO_FLUSH;
/* set to 0 so we allocate in the loop */
zip->avail_out = 0;
pr = &r;
do
{
if (zip->avail_out == 0)
{
/* allocate a new buffer to hold the rest of the compressed data */
*pr = buffer_new ();
if (!*pr)
break;
/* mark the buffer as completely full then remove unused data
when we exit this loop */
(*pr)->datasize = (*pr)->datamax;
zip->next_out = (unsigned char *) (*pr)->data;
zip->avail_out = (*pr)->datasize;
}
n = deflate (zip, flush);
if (n != Z_OK)
{
log ("buffer_compress: deflate: %s (error %d)",
NONULL (zip->msg), n);
break;
}
pr = &(*pr)->next;
}
while (zip->avail_out == 0 && flush == Z_SYNC_FLUSH);
/* subtract any uncompressed bytes */
bytes -= zip->avail_in;
*b = buffer_consume (*b, bytes);
if (r)
{
pr = &r;
while ((*pr)->next)
pr = &(*pr)->next;
(*pr)->datasize -= zip->avail_out;
/* this should only happen for the first created buffer if the
input was small and there was a second buffer in the list */
if ((*pr)->datasize == 0)
{
ASSERT (r->next == 0);
if (r->next != 0)
log ("buffer_compress: ERROR! r->next was not NULL");
FREE (r->data);
FREE (r);
r = 0;
}
}
return r;
}
/* assuming that we receive relatively short blocks via the network (less
than 16kb), we uncompress all data when we receive it and don't worry
about blocking.
NOTE: this is the only buffer_*() function that does not use the memory
pool. each server gets its own real input buffer */
int
buffer_decompress (BUFFER * b, z_streamp zip, char *in, int insize)
{
int n;
ASSERT (buffer_validate (b));
ASSERT (insize > 0);
zip->next_in = (unsigned char *) in;
zip->avail_in = insize;
zip->next_out = (unsigned char *) b->data + b->datasize;
zip->avail_out = b->datamax - b->datasize;
/* set this to the max size and subtract what is left after the inflate */
b->datasize = b->datamax;
do
{
/* if there is no more output space left, create some more */
if (zip->avail_out == 0)
{
/* allocate one extra byte to write a \0 char */
if (safe_realloc ((void **) &b->data, b->datamax + 2049))
{
OUTOFMEMORY ("buffer_decompress");
return -1;
}
b->datamax += 2048;
zip->next_out = (unsigned char *) b->data + b->datasize;
zip->avail_out = b->datamax - b->datasize;
/* set this to the max size and subtract what is left after the
inflate */
b->datasize = b->datamax;
}
n = inflate (zip, Z_SYNC_FLUSH);
if (n != Z_OK)
{
log ("buffer_decompress: inflate: %s (error %d)",
NONULL (zip->msg), n);
return -1;
}
}
while (zip->avail_out == 0);
/* subtract unused bytes */
b->datasize -= zip->avail_out;
return 0;
}
void
init_compress (CONNECTION * con, int level)
{
int n;
ASSERT (validate_connection (con));
ASSERT (ISSERVER (con));
con->sopt->zin = CALLOC (1, sizeof (z_stream));
if (!con->sopt->zin)
{
OUTOFMEMORY ("init_compress");
return;
}
con->sopt->zout = CALLOC (1, sizeof (z_stream));
if (!con->sopt->zout)
{
FREE (con->sopt->zin);
OUTOFMEMORY ("init_compress");
return;
}
n = inflateInit (con->sopt->zin);
if (n != Z_OK)
{
log ("init_compress: inflateInit: %s (%d)",
NONULL (con->sopt->zin->msg), n);
}
n = deflateInit (con->sopt->zout, level);
if (n != Z_OK)
{
log ("init_compress: deflateInit: %s (%d)",
NONULL (con->sopt->zout->msg), n);
}
log ("init_compress: compressing server stream at level %d", level);
}
void
finalize_compress (SERVER * serv)
{
int n;
n = deflateEnd (serv->zout);
if (n != Z_OK)
log ("finalize_compress: deflateEnd: %s (%d)",
NONULL (serv->zout->msg), n);
n = inflateEnd (serv->zin);
if (n != Z_OK)
log ("finalize_compress: inflateEnd: %s (%d)",
NONULL (serv->zin->msg), n);
FREE (serv->zin);
FREE (serv->zout);
}
int
send_queued_data (CONNECTION * con)
{
int n;
BUFFER *r;
ASSERT (validate_connection (con));
if (con->destroy)
{
/* connection is being shut down, just ignore it */
clear_write (con->fd); /* just to be sure */
return -1;
}
if (ISSERVER (con))
{
/* compress server output until we have at least 16k waiting (about
* the size of the tcp buffer for the socket)
*/
while (con->sopt->outbuf && buffer_size (con->sendbuf) < 16384)
{
/* buffer_compress will only compress the first buffer in the
* list, so we possibly need to call it multiple times.
*/
r = buffer_compress (con->sopt->zout, &con->sopt->outbuf);
if (!r)
break;
con->sendbuf = buffer_append (con->sendbuf, r);
}
/* for large networks, it might be desirable not to send data every
* time through the main loop. this adds support for queuing up
* a larger amount of data before actually doing a write()
*/
if (Server_Chunk > 0)
{
/* check to see if enough data has been accumulated to send */
if (buffer_size (con->sendbuf) < Server_Chunk)
{
clear_write (con->fd); /* turn off check for write */
return 0; /* wait until more data is recv'd */
}
}
}
/* write until the queue is consumed, or we would block */
while (con->sendbuf)
{
n = WRITE (con->fd, con->sendbuf->data + con->sendbuf->consumed,
con->sendbuf->datasize - con->sendbuf->consumed);
if (n == -1)
{
if (N_ERRNO != EWOULDBLOCK && N_ERRNO != EDEADLK &&
N_ERRNO != ENOBUFS)
{
clear_write (con->fd); /* just to be sure */
log ("send_queued_data: write: %s (errno %d) for host %s",
strerror (N_ERRNO), N_ERRNO, con->host);
return -1;
}
break;
}
else if (n == 0)
{
log ("send_queued_data: wrote 0 bytes to fd %d", con->fd);
break;
}
/* mark data as written */
con->sendbuf = buffer_consume (con->sendbuf, n);
/* keep track of the outgoing bandwidth */
global.bytes_out += n;
}
/* check to make sure the queue hasn't gotten too big */
if (ISSERVER (con))
{
/* for a server, we will have up to 16k in con->sendbuf, and then
* possibly a lot more in con->sopt->outbuf. the latter
* is uncompressed data.
*/
if (buffer_size (con->sopt->outbuf) > Server_Queue_Length)
{
log ("send_queued_data: output buffer for %s exceeded %u bytes",
con->host, Server_Queue_Length);
return -1;
}
}
else if (buffer_size (con->sendbuf) > Client_Queue_Length)
{
log ("send_queued_data: output buffer for %s exceeded %u bytes",
con->host, Client_Queue_Length);
return -1;
}
if (con->sendbuf || (ISSERVER (con) && con->sopt->outbuf))
/* still need to write */
set_write (con->fd);
else
/* output queue is empty, clear the write bit */
clear_write (con->fd);
return 0;
}
void
queue_data (CONNECTION * con, char *s, int ssize)
{
ASSERT (validate_connection (con));
if (ISSERVER (con))
{
/* always queue server data so we can compress it more effciently */
con->sopt->outbuf = buffer_queue (con->sopt->outbuf, s, ssize);
if (!con->sopt->outbuf)
destroy_connection (con);
/* server connections are always flushed at the end of the main
* event loop, so we don't need to call set_write() here
*/
}
else if (!con->destroy)
{
/* if no output is queued, immediately attempt to send it now to
* avoid copying
*/
if (!con->sendbuf)
{
#ifndef ALWAYS_QUEUE
int n = WRITE (con->fd, s, ssize);
if (n == -1)
{
if (N_ERRNO != EWOULDBLOCK)
{
log ("queue_data: %s: write: %s (errno %d)",
con->host, strerror (N_ERRNO), N_ERRNO);
destroy_connection (con);
return;
}
/* queue the data */
}
else if (n == ssize)
return; /* all written, nothing else to do */
else
{
/* queue the portion that didn't get written */
s += n;
ssize -= n;
}
#endif /* ! ALWAYS_QUEUE */
/* we want to know when the socket becomes writable again */
set_write (con->fd);
}
con->sendbuf = buffer_queue (con->sendbuf, s, ssize);
if (!con->sendbuf)
destroy_connection (con);
}
}